package com.hugy.test.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
@Slf4j
public class RestEsClientConfig {

    @Value("${elasticsearch.host}")
    private String host;

    private RestClient restClient;
    private RestHighLevelClient restHighLevelClient;

    @Bean(name = "myRestClient")
    @Scope("singleton")
    public RestClient getRestClient(){
        return restClient;
    }

    @Bean(name = "restHighLevelClient")
    @Scope("singleton")
    public RestHighLevelClient getRhlClient(){
        return restHighLevelClient;
    }

    @PostConstruct
    public void initRestClientConfig() {
        //基本的连接
//        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(host, port));
        RestClientBuilder clientBuilder = RestClient.builder(httpHost());
        //设置连接超时和套接字超时
        clientBuilder.setRequestConfigCallback(builder ->{
            builder.setConnectTimeout(3000)
                    .setSocketTimeout(30000)
                    .setConnectionRequestTimeout(5000);
            return builder;
        });
        //设置节点选择器
        clientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
        //配置HTTP异步请求ES的线程数
        clientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.setMaxConnTotal(30).setMaxConnPerRoute(5)
                        .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
                return httpAsyncClientBuilder;
        });
        //设置监听器，每次节点失败都可以监听到，可以作额外处理
        clientBuilder.setFailureListener(new RestClient.FailureListener() {
            @Override
            public void onFailure(Node node) {
                super.onFailure(node);
                log.error(node.getHost() + "--->该节点失败了");
            }
        });
        restClient = clientBuilder.build();
        restHighLevelClient = new RestHighLevelClient(clientBuilder);
    }

    private HttpHost[] httpHost(){
        //解析hostlist配置信息
        String[] split = host.split(",");
        //创建HttpHost数组，其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for(int i=0;i<split.length;i++){
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        return httpHostArray;
    }

    @PreDestroy
    public void closeRestClient() {
        try {
            if(restClient != null){
                restClient.close();
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}